package com.markhsiu.minimq.broker.store.disk;

import static com.markhsiu.minimq.broker.store.disk.DiskUtils.INDEX_FILE_SUFFIX;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.text.MessageFormat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.markhsiu.minimq.core.constant.ByteUtils;

/**
 *  topic索引
 * @author Mark Hsiu
 *
 */
public class TopicDiskIndex {

	private static Logger logger = LoggerFactory.getLogger(TopicDiskIndex.class);
	
	private static String MAGIC = "minimq";// 6byte
	private static byte[] MAGIC_BYTE = MAGIC.getBytes();
	private static int MAGIC_SIZE = 6;
	private static int INDEX_SIZE = 30;
	private static int READ_NUM_OFFSET = 6;// 4byte
	private static int READ_POS_OFFSET = 10;// 4byte
	private static int READ_CNT_OFFSET = 14;// 4byte
	private static int WRITE_NUM_OFFSET = 18;// 4byte
	private static int WRITE_POS_OFFSET = 22;// 4byte
	private static int WRITE_CNT_OFFSET = 26;// 4byte

	private volatile int readNum; // 读索引文件号
	private volatile int readPosition; // 读索引位置
	private volatile int readCount; // 总读取数量
	private volatile int writeNum; // 写索引文件号
	private volatile int writePosition; // 写索引位置
	private volatile int writeCount; // 总写入数量

	private RandomAccessFile indexFile;
	private FileChannel indexChannel;
	 // 读写分离
    private MappedByteBuffer writeIndex;
    private MappedByteBuffer readIndex;
	private String  topic;

	public TopicDiskIndex(File file,String topic) {
		this.topic = topic;
		init(file);
	}
	
	public TopicDiskIndex(String fileDir,String topic) {
		this.topic = topic;
		File file = new File(fileDir + topic + INDEX_FILE_SUFFIX);
		logger.debug(topic);
		init(file);
	}
	
	private void  init(File file){
		try {
			
			if (file.exists()) {
				byte[] bytes = ByteUtils.newByte(MAGIC_SIZE);
				indexFile = new RandomAccessFile(file, "rw");
				indexChannel = indexFile.getChannel();
				writeIndex = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, INDEX_SIZE);
                writeIndex = writeIndex.load();
                readIndex = (MappedByteBuffer) writeIndex.duplicate();
				indexFile.read(bytes, 0, MAGIC_SIZE);
				if (!ByteUtils.compare(bytes, MAGIC_BYTE)) {
					throw new IllegalArgumentException("magic mismatch");
				}
				readNum = indexFile.readInt();
				readPosition = indexFile.readInt();
				readCount = indexFile.readInt();
				writeNum = indexFile.readInt();
				writePosition = indexFile.readInt();
				writeCount = indexFile.readInt();
					
			} else {
				indexFile = new RandomAccessFile(file, "rw");
				indexChannel = indexFile.getChannel();
				writeIndex = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, INDEX_SIZE);
                writeIndex = writeIndex.load();
                readIndex = (MappedByteBuffer) writeIndex.duplicate();
	            
				putMagic();
				putReadNum(0);
				putReadPosition(0);
				putReadCount(0);
				putWriteNum(0);
				putWritePosition(0);
				putWriteCount(0);
			}
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	

	public void putReadNum(int readNum) {
		this.readIndex.position(READ_NUM_OFFSET);
		this.readIndex.putInt(readNum);
		this.readNum = readNum;
	}

	public void putReadCount(int readCount) {
		this.readIndex.position(READ_CNT_OFFSET);
		this.readIndex.putInt(readCount);
		this.readCount = readCount;
	}

	public void putReadPosition(int readPosition) {
		this.readIndex.position(READ_POS_OFFSET);
		this.readIndex.putInt(readPosition);
		this.readPosition = readPosition;
	}
	
	public void putMagic() {
		this.writeIndex.position(0);
		this.writeIndex.put(MAGIC.getBytes());
	}
	
	public void putWritePosition(int writePosition) {
		this.writeIndex.position(WRITE_POS_OFFSET);
		this.writeIndex.putInt(writePosition);
		this.writePosition = writePosition;
	}

	public void putWriteNum(int writeNum) {
		this.writeIndex.position(WRITE_NUM_OFFSET);
		this.writeIndex.putInt(writeNum);
		this.writeNum = writeNum;
	}

	public void putWriteCount(int writeCount) {
		this.writeIndex.position(WRITE_CNT_OFFSET);
		this.writeIndex.putInt(writeCount);
		this.writeCount = writeCount;
	}

	public int getReadNum() {
		return readNum;
	}

	public int getReadPosition() {
		return readPosition;
	}

	public int getReadCount() {
		return readCount;
	}

	public int getWriteNum() {
		return writeNum;
	}

	public int getWritePosition() {
		return writePosition;
	}

	public int getWriteCount() {
		return writeCount;
	}
	
	public void sync() {
        if (writeIndex  != null) {
        	writeIndex.force();
        }
    }
	
	public void reset() {
        int size = writeCount - readCount;
        putReadCount(0);
        putWriteCount(size);
        if (size == 0 && readNum == writeNum) {
            putReadPosition(0);
            putWritePosition(0);
        }
    }
	
	public void close() {
        try {
            if (writeIndex  == null) {
                return;
            }
            sync();
            ByteUtils.cleanMappedByte(writeIndex );
            writeIndex  = null;
            readIndex = null;
            indexChannel.close();
            indexFile.close();
        } catch (IOException e) {
        	logger.error("close fqueue index file failed", e);
        }
    }
	

	public String getTopic() {
		return topic;
	}

	
	@Override
	public String  toString(){
		StringBuffer sb = new StringBuffer();
		sb.append("topic={0} ");
		sb.append("readNum={1} ");
		sb.append("readCount={2} ");
		sb.append("readPosition={3} ");
		sb.append("writeNum={4} ");
		sb.append("writeCount={5} ");
		sb.append("writePosition={6}");
		String msg = MessageFormat.format(sb.toString(), 
				topic,readNum,readCount,readPosition,writeNum,writeCount,writePosition);
		return msg;
	}
}
